package io.openmessaging.demo;

import io.openmessaging.*;

import java.io.File;
import java.io.FilenameFilter;
import java.util.concurrent.atomic.AtomicBoolean;

public class DefaultProducer implements Producer {

    private MessageFactory messageFactory = new DefaultMessageFactory();

    private KeyValue properties;

    private String storeDirPath;

    private static AtomicBoolean flag = new AtomicBoolean(false);

    public DefaultProducer(KeyValue properties) {
        this.properties = properties;
        storeDirPath = this.properties.getString("STORE_PATH");
        if (storeDirPath == null) {
            storeDirPath = this.properties.getString("store.path");
        }
        cleanStorePath(storeDirPath);
    }


    @Override
    public BytesMessage createBytesMessageToTopic(String topic, byte[] body) {
        return messageFactory.createBytesMessageToTopic(topic, body);
    }

    @Override
    public BytesMessage createBytesMessageToQueue(String queue, byte[] body) {
        return messageFactory.createBytesMessageToQueue(queue, body);
    }

    @Override
    public void start() {

    }

    @Override
    public void shutdown() {
        flush();
    }

    @Override
    public KeyValue properties() {
        return properties;
    }

    @Override
    public void send(Message message) {
        if (message == null) {
            throw new ClientOMSException("Message should not be null");
        }

        if (!(message instanceof BytesMessage)) {
            throw new ClientOMSException("Message is not a BytesMessage instance");
        }
        String topic = message.headers().getString(MessageHeader.TOPIC);
        String queue = message.headers().getString(MessageHeader.QUEUE);
        if ((topic == null && queue == null) || (topic != null && queue != null)) {
            throw new ClientOMSException(String.format("Queue:%s Topic:%s should put one and only one", true, queue));
        }

        String fileBaseName = topic != null ? "topic-" + topic : "queue-" + queue;
        try {
            StoreUtils.writeMessage(storeDirPath, fileBaseName, (BytesMessage) message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void send(Message message, KeyValue properties) {
        throw new UnsupportedOperationException("Unsupported");
    }

    @Override
    public Promise<Void> sendAsync(Message message) {
        throw new UnsupportedOperationException("Unsupported");
    }

    @Override
    public Promise<Void> sendAsync(Message message, KeyValue properties) {
        throw new UnsupportedOperationException("Unsupported");
    }

    @Override
    public void sendOneway(Message message) {
        throw new UnsupportedOperationException("Unsupported");
    }

    @Override
    public void sendOneway(Message message, KeyValue properties) {
        throw new UnsupportedOperationException("Unsupported");
    }

    @Override
    public BatchToPartition createBatchToPartition(String partitionName) {
        throw new UnsupportedOperationException("Unsupported");
    }

    @Override
    public BatchToPartition createBatchToPartition(String partitionName, KeyValue properties) {
        throw new UnsupportedOperationException("Unsupported");
    }

    @Override
    public void flush() {
        StoreUtils.flush();
    }


    private static void cleanStorePath(String path) {
        synchronized (DefaultProducer.class) {
            if (flag.compareAndSet(false, true)) {
                File rootDir = new File(path);
                for (File file : rootDir.listFiles(new FilenameFilter() {
                    @Override
                    public boolean accept(File dir, String name) {
                        if (dir != rootDir) {
                            return false;
                        }
                        if (name.startsWith("msg.")) {
                            return true;
                        }
                        return false;
                    }
                })) {
                    file.delete();
                }

                System.out.println("=================balabala的说法是否===============");
            }
        }
    }
}